package com.gy.hadoop.mr.inputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

/**
 * 将多个小文件合并成一个SequenceFile文件（SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式），SequenceFile里面存储着多个文件，存储的形式为文件路径+名称为key，文件内容为value。
 */
public class MyInputformat {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{"mr/input", "mr/out/seq"};


        // 1 获取job对象
        Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(conf);

        // 2 设置jar包存储位置、关联自定义的mapper和reducer
        job.setJarByClass(MyInputformat.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        // 7设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);

        // 8设置输出的outputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 3 设置map输出端的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 4 设置最终输出端的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

    static class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {

        //1 返回是否可分割
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }

        //重写  返回 RecordReader
        @Override
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

            WholeRecordReader recordReader = new WholeRecordReader();
            recordReader.initialize(split, context);

            return recordReader;
        }
    }

    static class WholeRecordReader extends RecordReader<Text, BytesWritable> {

        private Configuration configuration;
        private FileSplit split;

        private boolean isProgress = true;

        private BytesWritable value = new BytesWritable();
        private Text k = new Text();

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.split = (FileSplit) split;
            configuration = context.getConfiguration();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {

            if (isProgress) {
                // 1 定义缓存区
                byte[] contents = new byte[(int) split.getLength()];

                FileSystem fs = null;
                FSDataInputStream fis = null;

                try {
                    // 2 获取文件系统
                    Path path = split.getPath();
                    fs = path.getFileSystem(configuration);

                    // 3 读取数据
                    fis = fs.open(path);

                    // 4 读取文件内容
                    IOUtils.readFully(fis, contents, 0, contents.length);

                    // 5 输出文件内容
                    value.set(contents, 0, contents.length);

                    // 6 获取文件路径及名称
                    String name = split.getPath().toString();

                    // 7 设置输出的key值
                    k.set(name);

                } catch (Exception e) {

                } finally {
                    IOUtils.closeStream(fis);
                }

                isProgress = false;
                return true;
            }

            return false;
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return k;
        }

        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }

        @Override
        public void close() throws IOException {
        }
    }

    static class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {

        @Override
        protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

            context.write(key, value);
        }
    }

    static class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

        @Override
        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

            context.write(key, values.iterator().next());
        }
    }
}

